Crate object_store
source ·Expand description
object_store
This crate provides a uniform API for interacting with object
storage services and local files via the ObjectStore
trait.
Using this crate, the same binary and code can run in multiple clouds and local test environments, via a simple runtime configuration change.
Highlights
-
A focused, easy to use, idiomatic, well documented, high performance,
async
API. -
Production quality, leading this crate to be used in large scale production systems, such as crates.io and InfluxDB IOx.
-
Stable and predictable governance via the Apache Arrow project.
Originally developed for InfluxDB IOx and subsequently donated to Apache Arrow.
Available ObjectStore
Implementations
By default, this crate provides the following implementations:
- Memory:
InMemory
- Local filesystem:
LocalFileSystem
Feature flags are used to enable support for other implementations:
gcp
: Google Cloud Storage support. SeeGoogleCloudStorageBuilder
aws
: Amazon S3. SeeAmazonS3Builder
azure
: Azure Blob Storage. SeeMicrosoftAzureBuilder
http
: HTTP/WebDAV Storage. SeeHttpBuilder
Adapters
ObjectStore
instances can be composed with various adapters
which add additional functionality:
- Rate Throttling:
ThrottleConfig
- Concurrent Request Limit:
LimitStore
List objects:
Use the ObjectStore::list
method to iterate over objects in
remote storage or files in the local filesystem:
use std::sync::Arc;
use object_store::{path::Path, ObjectStore};
use futures::stream::StreamExt;
// create an ObjectStore
let object_store: Arc<dyn ObjectStore> = Arc::new(get_object_store());
// Recursively list all files below the 'data' path.
// 1. On AWS S3 this would be the 'data/' prefix
// 2. On a local filesystem, this would be the 'data' directory
let prefix: Path = "data".try_into().unwrap();
// Get an `async` stream of Metadata objects:
let list_stream = object_store
.list(Some(&prefix))
.await
.expect("Error listing files");
// Print a line about each object based on its metadata
// using for_each from `StreamExt` trait.
list_stream
.for_each(move |meta| {
async {
let meta = meta.expect("Error listing");
println!("Name: {}, size: {}", meta.location, meta.size);
}
})
.await;
Which will print out something like the following:
Name: data/file01.parquet, size: 112832
Name: data/file02.parquet, size: 143119
Name: data/child/file03.parquet, size: 100
...
Fetch objects
Use the ObjectStore::get
method to fetch the data bytes
from remote storage or files in the local filesystem as a stream.
use std::sync::Arc;
use object_store::{path::Path, ObjectStore};
use futures::stream::StreamExt;
// create an ObjectStore
let object_store: Arc<dyn ObjectStore> = Arc::new(get_object_store());
// Retrieve a specific file
let path: Path = "data/file01.parquet".try_into().unwrap();
// fetch the bytes from object store
let stream = object_store
.get(&path)
.await
.unwrap()
.into_stream();
// Count the '0's using `map` from `StreamExt` trait
let num_zeros = stream
.map(|bytes| {
let bytes = bytes.unwrap();
bytes.iter().filter(|b| **b == 0).count()
})
.collect::<Vec<usize>>()
.await
.into_iter()
.sum::<usize>();
println!("Num zeros in {} is {}", path, num_zeros);
Which will print out something like the following:
Num zeros in data/file01.parquet is 657
Put object
Use the ObjectStore::put
method to save data in remote storage or local filesystem.
use object_store::ObjectStore;
use std::sync::Arc;
use bytes::Bytes;
use object_store::path::Path;
let object_store: Arc<dyn ObjectStore> = Arc::new(get_object_store());
let path: Path = "data/file1".try_into().unwrap();
let bytes = Bytes::from_static(b"hello");
object_store
.put(&path, bytes)
.await
.unwrap();
Multipart put object
Use the ObjectStore::put_multipart
method to save large amount of data in chunks.
use object_store::ObjectStore;
use std::sync::Arc;
use bytes::Bytes;
use tokio::io::AsyncWriteExt;
use object_store::path::Path;
let object_store: Arc<dyn ObjectStore> = Arc::new(get_object_store());
let path: Path = "data/large_file".try_into().unwrap();
let (_id, mut writer) = object_store
.put_multipart(&path)
.await
.unwrap();
let bytes = Bytes::from_static(b"hello");
writer.write_all(&bytes).await.unwrap();
writer.flush().await.unwrap();
writer.shutdown().await.unwrap();
Modules
- An object store implementation for S3
- An object store implementation for Azure blob storage
- Utilities for performing tokio-style buffered IO
- A
ChunkedStore
that can be used to test streaming behaviour - Utility for streaming newline delimited files from object storage
- An object store implementation for Google Cloud Storage
- An object store implementation for generic HTTP servers
- An object store that limits the maximum concurrency of the wrapped implementation
- An object store implementation for a local filesystem
- An in-memory object store implementation
- Cloud Multipart Upload
- Path abstraction for Object Storage
- An object store wrapper handling a constant path prefix
- A throttling object store wrapper
Structs
- Exponential backoff with jitter
- HTTP client configuration for remote object stores
- Options for a get request, such as range
- Result for a get request
- Result of a list call that includes objects, prefixes (directories) and a token for the next set of results. Individual result sets may be limited to 1,000 objects based on the underlying object storage’s limitations.
- The metadata that describes an object.
- Contains the configuration for how to respond to server errors
- A static set of credentials
Enums
- Configuration keys for
ClientOptions
- A specialized
Error
for object store-related errors - The kind of a
GetResult
Constants
- Range requests with a gap less than or equal to this, will be coalesced into a single request by
coalesce_ranges
Traits
- Provides credentials for use when signing requests
- Universal API to multiple object store services.
Functions
- Takes a function
fetch
that can fetch a range of bytes and uses this to fetch the provided byteranges
- Collect a stream into
Bytes
avoiding copying in the event of a single chunk - Create an
ObjectStore
based on the providedurl
- Create an
ObjectStore
based on the providedurl
and options
Type Aliases
- An alias for a dynamically dispatched object store implementation.
- Id type for multi-part uploads.
- A specialized
Result
for object store-related errors